/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.api.service.impl;

import com.chinamobile.cmss.lakehouse.api.service.MetaDataService;
import com.chinamobile.cmss.lakehouse.common.Constants;
import com.chinamobile.cmss.lakehouse.common.dto.MetaDataNodeDto;
import com.chinamobile.cmss.lakehouse.common.dto.PageBean;
import com.chinamobile.cmss.lakehouse.common.dto.SearchMetadataTableDto;
import com.chinamobile.cmss.lakehouse.common.dto.SqlExecuteContextDto;
import com.chinamobile.cmss.lakehouse.common.dto.TableDto;
import com.chinamobile.cmss.lakehouse.common.dto.TableNameDto;
import com.chinamobile.cmss.lakehouse.common.enums.ExecuteTaskType;
import com.chinamobile.cmss.lakehouse.common.enums.HttpStatus;
import com.chinamobile.cmss.lakehouse.common.enums.Status;
import com.chinamobile.cmss.lakehouse.common.enums.Symbol;
import com.chinamobile.cmss.lakehouse.common.utils.Result;
import com.chinamobile.cmss.lakehouse.dao.HiveTableDao;
import com.chinamobile.cmss.lakehouse.dao.TableTaskInfoDao;
import com.chinamobile.cmss.lakehouse.dao.entity.TableTaskInfoEntity;
import com.chinamobile.cmss.lakehouse.dao.entity.UserEntity;
import com.chinamobile.cmss.lakehouse.service.hive.HiveAccessService;
import com.chinamobile.cmss.lakehouse.service.hive.HiveSqlExecutor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.statement.SQLAlterStatement;
import com.alibaba.druid.sql.ast.statement.SQLCreateStatement;
import com.alibaba.druid.sql.visitor.SchemaStatVisitor;
import com.alibaba.druid.stat.TableStat;
import com.alibaba.druid.util.JdbcConstants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Slf4j
public class MetaDataServiceImpl extends BaseServiceImpl implements MetaDataService {

    @Autowired
    private HiveAccessService hiveAccessService;

    @Autowired
    private TableTaskInfoDao tableTaskInfoDao;

    @Autowired
    private HiveTableDao hiveTableDao;

    public MetaDataServiceImpl() {
    }

    public boolean checkTableIfExist(String userName, String databaseName, String tableName, boolean checkPermission) {
        try {
            if (checkPermission) {
                hiveAccessService.accessPermissionCheck(userName, TableNameDto.builder()
                    .databaseName(databaseName)
                    .tableName(tableName)
                    .build());
            }
            Table table = hiveAccessService.getTable(databaseName, tableName, userName);
            if (table != null) {
                return true;
            }
        } catch (Exception e) {
            log.error("checkTableExists failed.", e);
        }
        return false;
    }

    @Transactional(rollbackFor = Exception.class)
    public void execute(String userName, SqlExecuteContextDto sqlExecuteContextDto, HiveSqlExecutor hiveSqlExecutor, SQLStatement sqlStatement) throws Exception {
        String sql = SQLUtils.toSQLString(sqlStatement, JdbcConstants.HIVE);
        log.info("execute sql: {}", sql);
        if (sql.endsWith(Symbol.SEMICOLON.getSymbol())) {
            sql = sql.substring(0, sql.length() - 1);
        }

        boolean isTableExist = false;
        if (sqlStatement instanceof SQLAlterStatement || sqlStatement instanceof SQLCreateStatement) {
            final SchemaStatVisitor statVisitor = SQLUtils.createSchemaStatVisitor(JdbcConstants.HIVE);
            sqlStatement.accept(statVisitor);
            final Map<TableStat.Name, TableStat> tables = statVisitor.getTables();
            if (tables.size() == 0) {
                throw new Exception("DDL statement should  have more than one operate table");
            }
            final TableStat.Name name = tables.keySet().stream().findFirst().get();
            final String[] split = StringUtils.split(name.getName(), ".");
            String databaseName;
            String tableName;
            final int tableSplitSize = 2;
            if (split.length == tableSplitSize) {
                databaseName = split[0];
                tableName = split[1];
            } else {
                databaseName = "default";
                tableName = split[0];
            }

            if (ExecuteTaskType.DATA_DISCOVERY.getCnName().equals(sqlExecuteContextDto.getRefTaskType())) {
                isTableExist = checkTableIfExist(userName, databaseName, tableName, true);
            }
            log.info("isTableExist: {}", isTableExist);

            if (!isTableExist) {
                Optional<TableTaskInfoEntity> taskInfo = tableTaskInfoDao.findByDbNameAndTableName(databaseName, tableName);
                final TableTaskInfoEntity tableTaskInfo = taskInfo.orElse(new TableTaskInfoEntity());
                tableTaskInfo.setTaskId(sqlExecuteContextDto.getTaskId());
                tableTaskInfo.setTaskType(ExecuteTaskType.forValues(sqlExecuteContextDto.getRefTaskType()));
                tableTaskInfo.setCreator(userName);
                tableTaskInfo.setCreateTime((int) (System.currentTimeMillis() / 1000));
                tableTaskInfo.setTableName(tableName);
                tableTaskInfo.setDbName(databaseName);
                tableTaskInfoDao.save(tableTaskInfo);
            }
        }
        if (!isTableExist) {
            hiveSqlExecutor.execute(sql);
        }
    }

    @Override
    public Map<String, Object> searchUserTable(UserEntity loginUser, SearchMetadataTableDto searchMetadataTableDto) {
        Map<String, Object> result = new HashMap<>();
        log.info("user {} search user table, request body {}", loginUser.getUserName(), searchMetadataTableDto);
        Set<String> userDatabases = hiveAccessService.getDatabasesByUser(loginUser.getUserName());
        PageBean<TableDto> res = hiveTableDao.searchTable(userDatabases, searchMetadataTableDto);
        result.put(Constants.DATA_LIST, res);
        return result;
    }

    public Map<String, Object> getUserDatabases(UserEntity loginUser) {
        Map<String, Object> result = new HashMap<>();
        Set<String> userDatabases = hiveAccessService.getDatabasesByUser(loginUser.getUserName());
        List<String> dbs = userDatabases.stream().collect(Collectors.toList());
        result.put(Constants.DATA_LIST, dbs);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public Map<String, Object> sqlExecuteProxy(UserEntity loginUser, SqlExecuteContextDto sqlExecuteContextDto) {
        Map<String, Object> result = new HashMap<>();

        log.info("sqlExecutorProxy([{}])", sqlExecuteContextDto);
        final String sql = sqlExecuteContextDto.getSql();
        List<SQLStatement> sqlStatements = SQLUtils.parseStatements(sql, JdbcConstants.HIVE);
        if (sqlStatements.size() == 0) {
            result.put(HttpStatus.INTERNAL_SERVER_ERROR.toString(), "Statement must greater than zero");
        }
        try (HiveSqlExecutor hiveSqlExecutor = hiveAccessService.getHiveSqlExecutor(loginUser.getUserName())) {
            for (SQLStatement sqlStatement : sqlStatements) {
                execute(loginUser.getUserName(), sqlExecuteContextDto, hiveSqlExecutor, sqlStatement);
            }
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            result.put(HttpStatus.BAD_REQUEST.toString(), e);
        }
        return result;
    }

    @Override
    public void deleteTable(UserEntity loginUser, TableNameDto tableNameDto) {
        hiveAccessService.accessPermissionCheck(loginUser.getUserName(), tableNameDto);
        hiveAccessService.deleteTable(loginUser.getUserName(), tableNameDto);
    }

    @Override
    public Map<String, List<MetaDataNodeDto>> metadataTree(UserEntity loginUser) {
        Map<String, List<MetaDataNodeDto>> result = new HashMap<>();

        String userName = loginUser.getUserName();
        final Set<String> userDatabases = hiveAccessService.getDatabasesByUser(userName);
        log.info("user databases: [{}]", userDatabases.stream().collect(Collectors.joining(",")));
        List<MetaDataNodeDto> databaseDtos = new ArrayList<>(userDatabases.size());
        for (String databaseName : userDatabases) {
            MetaDataNodeDto databaseDto = MetaDataNodeDto.create(databaseName, MetaDataNodeDto.TitleType.database, false, null);
            final List<Table> tables = hiveAccessService.getTables(databaseName, userName);
            List<MetaDataNodeDto> tableDtos = new ArrayList<>(tables.size());
            for (Table table : tables) {
                MetaDataNodeDto tableDto = MetaDataNodeDto.create(table.getTableName(), MetaDataNodeDto.TitleType.table, false, null);
                final StorageDescriptor sd = table.getSd();
                List<MetaDataNodeDto> columnDtos = new ArrayList<>(table.getPartitionKeysSize() + sd.getColsSize());
                for (FieldSchema partitionKey : table.getPartitionKeys()) {
                    MetaDataNodeDto metaDataNodeDto = MetaDataNodeDto.create(partitionKey.getName(),
                        MetaDataNodeDto.TitleType.column, true, null);
                    columnDtos.add(metaDataNodeDto);
                }
                for (FieldSchema col : sd.getCols()) {
                    MetaDataNodeDto metaDataNodeDto = MetaDataNodeDto.create(col.getName(),
                        MetaDataNodeDto.TitleType.column, true, null);
                    columnDtos.add(metaDataNodeDto);
                }
                tableDto.setChildren(columnDtos);
                tableDtos.add(tableDto);
            }
            databaseDto.setChildren(tableDtos);
            databaseDtos.add(databaseDto);
        }
        result.put(Constants.DATA_LIST, databaseDtos);
        return result;
    }

    public Result metadataDatabaseTree(UserEntity loginUser) {
        Result<Object> result = new Result<>();

        String userName = loginUser.getUserName();
        final Set<String> userDatabases = hiveAccessService.getDatabasesByUser(userName);
        log.info("user databases: [{}]", userDatabases.stream().collect(Collectors.joining(",")));
        List<MetaDataNodeDto> databaseDtos = new ArrayList<>(userDatabases.size());
        for (String databaseName : userDatabases) {
            MetaDataNodeDto databaseDto = MetaDataNodeDto.create(databaseName, MetaDataNodeDto.TitleType.database, false, null);
            databaseDto.setChildren(null);
            databaseDtos.add(databaseDto);
        }

        result.setData(databaseDtos);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public Result getTables(UserEntity loginUser, String databaseName) {
        Result<Object> result = new Result<>();

        final List<Table> tables = hiveAccessService.getTables(databaseName, loginUser.getUserName());
        List<MetaDataNodeDto> tableDtos = new ArrayList<>(tables.size());
        for (Table table : tables) {
            MetaDataNodeDto tableDto = MetaDataNodeDto.create(table.getTableName(), MetaDataNodeDto.TitleType.table, false, databaseName);
            tableDto.setChildren(null);
            tableDtos.add(tableDto);
        }
        result.setData(tableDtos);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public Result getColumns(UserEntity loginUser, String databaseName, String tableName) {
        Result<Object> result = new Result<>();

        final Table table = hiveAccessService.getTable(databaseName, tableName, loginUser.getUserName());
        final StorageDescriptor sd = table.getSd();
        List<MetaDataNodeDto> columnDtos = new ArrayList<>(table.getPartitionKeysSize() + sd.getColsSize());
        for (FieldSchema partitionKey : table.getPartitionKeys()) {
            MetaDataNodeDto metaDataNodeDto = MetaDataNodeDto.create(partitionKey.getName(),
                MetaDataNodeDto.TitleType.column, true, tableName);
            columnDtos.add(metaDataNodeDto);
        }
        for (FieldSchema col : sd.getCols()) {
            MetaDataNodeDto metaDataNodeDto = MetaDataNodeDto.create(col.getName(),
                MetaDataNodeDto.TitleType.column, true, tableName);
            columnDtos.add(metaDataNodeDto);
        }
        result.setData(columnDtos);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public boolean checkTableIfExist(UserEntity loginUser, String databaseName, String tableName, boolean checkPermission) {
        try {
            if (checkPermission) {
                hiveAccessService.accessPermissionCheck(loginUser.getUserName(), TableNameDto.builder()
                    .databaseName(databaseName)
                    .tableName(tableName)
                    .build());
            }
            Table table = hiveAccessService.getTable(databaseName, tableName, loginUser.getUserName());
            if (table != null) {
                return true;
            }
        } catch (Exception e) {
            log.error("checkTableExists failed.", e);
        }
        return false;
    }

}
